package com.bicycle.access.thread;

import com.alibaba.fastjson.JSON;
import com.bicycle.access.process.DataToMySqlService;
import com.bicycle.common.enums.MsgEnums;
import com.bicycle.service.dto.BaseJsonMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.tomcat.util.http.ConcurrentDateFormat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;


/**
 * 多线程
 */
@Slf4j
public class StatictisThread implements Runnable {

    private DataToMySqlService dataToMySqlService;

    private ConsumerRecord<Object, Object> consumerRecord;

    public StatictisThread(ConsumerRecord<Object, Object> consumerRecord,DataToMySqlService dataToMySqlService){
        this.consumerRecord = consumerRecord;
        this.dataToMySqlService = dataToMySqlService;
    }

    @Override
    public void run() {
        log.info("进来这里了>>>>>>>>>>>>>>>>>线程ID=" + Thread.currentThread().getId() + "，进入时间：" + LocalDateTime.now());
            try {
                log.info(">>>>>>Consumer:topic:" + consumerRecord.topic() + ",offset:" + consumerRecord.offset() + ",key:" + consumerRecord.key() + ",value:" + consumerRecord.value()+ ",partition:" + consumerRecord.partition());
                String jsonValue = consumerRecord.value().toString();
                BaseJsonMsg baseJsonMsg = JSON.parseObject(jsonValue, BaseJsonMsg.class);
                // 订单信息
                if (StringUtils.equals(baseJsonMsg.getType(), MsgEnums.MSG_ORDER.getTopicKey())) {
                    dataToMySqlService.orderInfo(baseJsonMsg.getJsonStr().toString());
                }
                // 车辆位置上报
                else if (StringUtils.equals(baseJsonMsg.getType(), MsgEnums.MSG_BICYCLE_SITE.getTopicKey())) {
                    dataToMySqlService.bicycleSite(baseJsonMsg.getJsonStr().toString());
                }
            } catch (Exception e) {
                log.error(">>>>>>>>>>>>>>>>>>消费错误error:", e);
                log.error(">>>>>>消费错误Consumer:topic:" + consumerRecord.topic() + ",offset:" + consumerRecord.offset() + ",key:" + consumerRecord.key() + ",value:" + consumerRecord.value());
            }
    }
}
